ParallelService Class

Thread-safe parallel processing with built-in observability and concurrency control

reference
parallel-processing
concurrency
performance
threading
observability
Author

Diginsight Components

Published

March 17, 2026

The ParallelService offers thread-safe parallel processing with built-in observability, concurrency control, and flexible configuration options.

In particular, it provides dynamically configurable concurrency levels for CPU and I/O bound operations in .NET applications.

ParallelService is part of Diginsight.Components.

The service supports both synchronous and asynchronous parallel operations with predefined concurrency levels (Low, Medium, High) or custom parallelism settings.

Table of Contents

📋 Overview

The ParallelService provides a unified interface for executing parallel operations with controlled concurrency. It automatically manages thread allocation, provides built-in observability through Diginsight telemetry, and supports both CPU-bound and I/O-bound workloads with optimized execution strategies.

Key Features

  • Controlled Concurrency: Predefined Low, Medium, and High concurrency levels with configurable limits
  • Flexible Execution: Support for both synchronous and asynchronous parallel operations
  • Built-in Observability: Automatic telemetry and activity tracking through Diginsight observability
  • Environment Overrides: Runtime concurrency control via environment variables
  • WhenAll Support: Parallel execution of task factories with result collection
  • Tuple Decomposition: Type-safe parallel execution with structured result handling
  • Exception Safety: Graceful handling of break conditions and operation failures
  • Configuration Flexibility: Dynamic and volatile configuration support

Concurrency Levels

The service provides three predefined concurrency levels optimized for different workload types:

Level Default Value Typical Use Case
Low 3 I/O-bound operations, external API calls
Medium 6 Mixed workloads, moderate CPU usage
High 12 CPU-intensive operations, computational tasks

All levels respect the global MaxConcurrency setting when configured.

🔍 Additional Details

Synchronous vs Asynchronous Operations

The ParallelService supports both execution patterns with different underlying implementations:

Synchronous Operations (ForEach): - Uses Parallel.ForEach from TPL - Best for CPU-bound operations - Automatic work-stealing and load balancing - Thread pool based execution

Asynchronous Operations (ForEachAsync, WhenAllAsync): - Uses Parallel.ForEachAsync (.NET 6+) or custom implementation (.NET Framework/Core) - Optimized for I/O-bound operations - Async/await pattern support - Better resource utilization for async workloads

Concurrency Control

The service implements a hierarchical concurrency control system:

  1. Global MaxConcurrency: Sets upper limit for all operations
  2. Level-specific Settings: LowConcurrency, MediumConcurrency, HighConcurrency
  3. Runtime Overrides: Environment variables can override settings
  4. Default Fallbacks: Built-in defaults when no configuration is provided
// Concurrency resolution logic
int ResolvedConcurrency = Math.Min(
    options.MaxConcurrency > 0 ? options.MaxConcurrency : int.MaxValue,
    specificLevelConcurrency > 0 ? specificLevelConcurrency : defaultValue
);

Environment Variable Overrides

The service supports runtime configuration through the MaxConcurrency environment variable:

# Set maximum concurrency for all operations
export MaxConcurrency=8

# Or in Windows
set MaxConcurrency=8

This override takes precedence over configuration files and is useful for: - Production tuning without redeployment - Environment-specific optimization - Load testing scenarios - Container resource limiting

Break Loop Exception Handling

The service provides controlled loop termination through BreakLoopException:

// Usage in parallel operations
await parallelService.ForEachAsync(items, options, async item =>
{
    if (shouldStop)
    {
        throw new BreakLoopException { Data = { ["item"] = item } };
    }
    await ProcessAsync(item);
});

The exception allows graceful termination of parallel loops while preserving context information for debugging and telemetry.

⚙️ Configuration

Configuration in appsettings.json

{
  "ParallelServiceOptions": {
    "MaxConcurrency": 16,
    "LowConcurrency": 4,
    "MediumConcurrency": 8,
    "HighConcurrency": 16
  }
}

Configuration in the startup sequence

Register the ParallelService in your dependency injection container:

// Basic registration
services.AddScoped<IParallelService, ParallelService>();

// With configuration
services.Configure<ParallelServiceOptions>(options =>
{
    options.MaxConcurrency = 16;    // Global maximum
    options.LowConcurrency = 4;     // For I/O operations
    options.MediumConcurrency = 8;  // For mixed workloads
    options.HighConcurrency = 16;   // For CPU operations
});

// Register options from configuration
services.Configure<ParallelServiceOptions>(
    configuration.GetSection("ParallelServiceOptions"));

Dynamic Configuration

The service supports dynamic configuration updates through the options pattern:

// Dynamic configuration update
public class ParallelServiceController : ControllerBase
{
    private readonly IOptionsMonitor<ParallelServiceOptions> _optionsMonitor;

    public ParallelServiceController(IOptionsMonitor<ParallelServiceOptions> optionsMonitor)
    {
        _optionsMonitor = optionsMonitor;
    }

    [HttpPost("update-concurrency")]
    public IActionResult UpdateConcurrency([FromBody] ParallelServiceOptions newOptions)
    {
        // Options will be automatically updated and applied to new operations
        // Note: Existing operations continue with their original settings
        return Ok();
    }
}

💡 Usage Examples

Basic ForEach Operations

public class DocumentProcessor
{
    private readonly IParallelService _parallelService;

    public DocumentProcessor(IParallelService parallelService)
    {
        _parallelService = parallelService;
    }

    public void ProcessDocuments(IEnumerable<Document> documents)
    {
        // CPU-intensive processing with high concurrency
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.HighConcurrency 
        };

        _parallelService.ForEach(documents, options, document =>
        {
            // Synchronous processing
            ProcessDocument(document);
        });
    }

    public void ValidateDocuments(IEnumerable<Document> documents)
    {
        // Light validation with low concurrency
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.LowConcurrency 
        };

        _parallelService.ForEach(documents, options, document =>
        {
            ValidateDocument(document);
        });
    }
}

Async ForEach Operations

public class ApiDataProcessor
{
    private readonly IParallelService _parallelService;
    private readonly HttpClient _httpClient;

    public ApiDataProcessor(IParallelService parallelService, HttpClient httpClient)
    {
        _parallelService = parallelService;
        _httpClient = httpClient;
    }

    public async Task ProcessUrlsAsync(IEnumerable<string> urls)
    {
        // I/O-bound operations with medium concurrency
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.MediumConcurrency 
        };

        await _parallelService.ForEachAsync(urls, options, async url =>
        {
            try
            {
                var response = await _httpClient.GetAsync(url);
                var content = await response.Content.ReadAsStringAsync();
                await ProcessContentAsync(content);
            }
            catch (Exception ex)
            {
                // Handle individual URL failures
                LogError($"Failed to process {url}: {ex.Message}");
            }
        });
    }
}

WhenAll Operations

public class DataAggregationService
{
    private readonly IParallelService _parallelService;

    public async Task<AggregatedData> GetAggregatedDataAsync()
    {
        var taskFactories = new List<Func<Task>>
        {
            () => LoadUserDataAsync(),
            () => LoadProductDataAsync(),
            () => LoadOrderDataAsync(),
            () => LoadInventoryDataAsync()
        };

        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.HighConcurrency 
        };

        // Execute all tasks in parallel
        await _parallelService.WhenAllAsync(taskFactories, options);

        return new AggregatedData();
    }

    public async Task<List<T>> ProcessMultipleEndpointsAsync<T>(
        IEnumerable<Func<Task<T>>> endpointCalls)
    {
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.LowConcurrency 
        };

        var results = await _parallelService.WhenAllAsync(endpointCalls, options);
        return results.ToList();
    }
}

Tuple Decomposition

public class UserProfileService
{
    private readonly IParallelService _parallelService;

    public async Task<UserProfileViewModel> GetUserProfileAsync(int userId)
    {
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.MediumConcurrency 
        };

        // Execute multiple related operations in parallel
        var (user, preferences, activities, notifications) = 
            await _parallelService.WhenAllAsync(
                () => GetUserAsync(userId),
                () => GetUserPreferencesAsync(userId), 
                () => GetRecentActivitiesAsync(userId),
                () => GetNotificationsAsync(userId),
                options
            );

        return new UserProfileViewModel
        {
            User = user,
            Preferences = preferences,
            Activities = activities,
            Notifications = notifications
        };
    }

    // Also supports 2-tuple and 3-tuple overloads
    public async Task<(UserData, UserPreferences)> GetBasicProfileAsync(int userId)
    {
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.LowConcurrency 
        };

        return await _parallelService.WhenAllAsync(
            () => GetUserAsync(userId),
            () => GetUserPreferencesAsync(userId),
            options
        );
    }
}

🔧 Troubleshooting

Common Issues

1. Poor Performance with High Concurrency

High concurrency doesn’t always mean better performance:

// Problem: Too much concurrency for I/O operations
var options = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = _parallelService.HighConcurrency  // 12 threads
};
await _parallelService.ForEachAsync(apiCalls, options, CallExternalApiAsync);

// Solution: Use appropriate concurrency level
var options = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = _parallelService.LowConcurrency  // 3 threads
};
await _parallelService.ForEachAsync(apiCalls, options, CallExternalApiAsync);

2. Thread Starvation

When mixing CPU and I/O operations:

// Problem: Blocking the thread pool
_parallelService.ForEach(items, options, item =>
{
    // Blocking I/O operation on thread pool thread
    var result = httpClient.GetAsync(item.Url).Result;
    ProcessResult(result);
});

// Solution: Use async version
await _parallelService.ForEachAsync(items, options, async item =>
{
    var result = await httpClient.GetAsync(item.Url);
    ProcessResult(result);
});

3. Configuration Not Applied

Ensure proper service registration and configuration:

// Verify registration order
services.Configure<ParallelServiceOptions>(configuration.GetSection("ParallelServiceOptions"));
services.AddScoped<IParallelService, ParallelService>();

// Verify configuration values
var options = serviceProvider.GetRequiredService<IOptions<ParallelServiceOptions>>();
Console.WriteLine($"MaxConcurrency: {options.Value.MaxConcurrency}");

Performance Considerations

CPU-Bound Operations: - Use ForEach for better performance - Set concurrency to CPU core count or slightly higher - Avoid excessive context switching

I/O-Bound Operations: - Use ForEachAsync and WhenAllAsync - Keep concurrency moderate (3-10) to avoid overwhelming external services - Use appropriate timeout settings

Memory Usage: - Monitor memory consumption with large datasets - Consider batching for very large collections - Be aware of closure captures in lambda expressions

Debugging

Enable detailed logging for troubleshooting:

services.Configure<LoggerFilterOptions>(options =>
{
    options.AddFilter("Diginsight.Components.ParallelService", LogLevel.Debug);
});

Use the built-in observability features:

// The service automatically creates activities for method calls
// Check your telemetry system for:
// - Method execution times
// - Concurrency levels used
// - Exception patterns

📚 Reference

Classes and Interfaces

  • IParallelService: Main service interface for parallel operations
  • ParallelService: Default implementation with configurable concurrency levels
  • IParallelServiceOptions: Configuration interface for concurrency settings
  • ParallelServiceOptions: Configuration implementation with dynamic/volatile support
  • BreakLoopException: Exception for controlled loop termination

Methods

ForEach Operations

void ForEach<TSource>(
    IEnumerable<TSource> source, 
    ParallelOptions parallelOptions, 
    Action<TSource> body)
Task ForEachAsync<TSource>(
    IEnumerable<TSource> source, 
    ParallelOptions parallelOptions, 
    Func<TSource, Task> body)

WhenAll Operations

Task WhenAllAsync(
    IEnumerable<Func<Task>> taskFactories, 
    ParallelOptions parallelOptions)
Task<IEnumerable<T>> WhenAllAsync<T>(
    IEnumerable<Func<Task<T>>> taskFactories, 
    ParallelOptions parallelOptions)

Tuple Decomposition Overloads

Task<(T1, T2)> WhenAllAsync<T1, T2>(
    Func<Task<T1>> taskFactory1, 
    Func<Task<T2>> taskFactory2, 
    ParallelOptions parallelOptions)

Task<(T1, T2, T3)> WhenAllAsync<T1, T2, T3>(
    Func<Task<T1>> taskFactory1, 
    Func<Task<T2>> taskFactory2, 
    Func<Task<T3>> taskFactory3, 
    ParallelOptions parallelOptions)

Task<(T1, T2, T3, T4)> WhenAllAsync<T1, T2, T3, T4>(
    Func<Task<T1>> taskFactory1, 
    Func<Task<T2>> taskFactory2, 
    Func<Task<T3>> taskFactory3, 
    Func<Task<T4>> taskFactory4, 
    ParallelOptions parallelOptions)

Configuration Properties

Property Type Default Description
MaxConcurrency int 0 Global maximum concurrency limit (0 = unlimited)
LowConcurrency int 3 Concurrency level for I/O-bound operations
MediumConcurrency int 6 Concurrency level for mixed workloads
HighConcurrency int 12 Concurrency level for CPU-intensive operations

Default Values

private const int LOWCONCURRENCY_DEFAULT = 3;
private const int MEDIUMCONCURRENCY_DEFAULT = 6;
private const int HIGHCONCURRENCY_DEFAULT = 12;

💡 Best Practices

Choosing Concurrency Levels

Low Concurrency (3 threads): - External API calls - Database queries - File I/O operations - Network requests to rate-limited services

Medium Concurrency (6 threads): - Mixed CPU/I/O workloads - Image/document processing - Data transformation tasks - Moderate computational work

High Concurrency (12 threads): - Pure CPU-bound calculations - Mathematical computations - Cryptographic operations - Parallel algorithms

Thread Safety

Ensure thread-safe operations in parallel bodies:

// Thread-safe: Each operation works on independent data
await _parallelService.ForEachAsync(users, options, async user =>
{
    user.ProcessedAt = DateTime.UtcNow;  // Safe: each user is independent
    await SaveUserAsync(user);           // Safe: if SaveUserAsync is thread-safe
});

// Not thread-safe: Shared state modification
var totalProcessed = 0;
_parallelService.ForEach(items, options, item =>
{
    ProcessItem(item);
    totalProcessed++;  // Race condition!
});

// Thread-safe alternative: Use concurrent collections
var totalProcessed = 0;
_parallelService.ForEach(items, options, item =>
{
    ProcessItem(item);
    Interlocked.Increment(ref totalProcessed);  // Safe atomic operation
});

Resource Management

Properly manage resources in parallel operations:

public class ResourceManagedProcessor
{
    private readonly IParallelService _parallelService;
    private readonly SemaphoreSlim _semaphore;

    public ResourceManagedProcessor(IParallelService parallelService)
    {
        _parallelService = parallelService;
        _semaphore = new SemaphoreSlim(5, 5); // Limit concurrent resource usage
    }

    public async Task ProcessItemsAsync(IEnumerable<Item> items)
    {
        var options = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = _parallelService.MediumConcurrency 
        };

        await _parallelService.ForEachAsync(items, options, async item =>
        {
            await _semaphore.WaitAsync();
            try
            {
                using var resource = CreateExpensiveResource();
                await ProcessWithResourceAsync(item, resource);
            }
            finally
            {
                _semaphore.Release();
            }
        });
    }
}

Key Guidelines: - Always consider the nature of your workload (CPU vs I/O bound) - Monitor resource usage and adjust concurrency accordingly - Use appropriate exception handling for individual operations - Leverage the built-in observability for performance monitoring - Test with realistic data volumes and network conditions

Back to top